package com.hery.functions;

import com.alibaba.fastjson.JSONArray;
import com.hery.utils.Config;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.util.Collector;

@Slf4j
public class LogBeanFlatMapFunction extends RichFlatMapFunction<String, String> {

    @Override
    public void flatMap(String value, Collector<String> out) {
        try {

            if (Config.KAFKA_CONSUMER_TOPIC_LIST.contains("events")) {

                String s = JSONArray.parseArray(value).get(2).toString();
                out.collect(s);
            }else{
                out.collect(value);
            }

        } catch (Exception e) {
            log.error("解析失败：{} , Error Message:{}" , value, e.getMessage());
            e.printStackTrace();
        }
    }
}
